#include "config.h"

#include <sys/types.h>
#include <sys/wait.h>
#include <rpc/pmap_clnt.h>
#include <errno.h>
#include <pthread.h>
#include <getopt.h>
#include <sys/file.h>
#include <sys/vfs.h>
#include <sys/time.h>
#include <sys/resource.h>

#define DBG_SUBSYS S_LIBINTERFACE

#include <pthread.h>

#include "lichbd.h"
#include "vm.h"
#include "sock_unix.h"
#include "stor_root.h"
#include "../../storage/controller/volume_ctl.h"
#include "get_version.h"
#include "rpc_proto.h"
#include "mem_cache.h"
#include "lichbd_rpc.h"
#include "job_dock.h"
#include "lich_aio.h"
#include "lichbd_cache.h"
#include "../../ynet/sock/sock_tcp.h"
#include "../../ynet/net/net_events.h"
#include "rpc_table.h"
#include "lichstor.h"
#include "analysis.h"
#include "dbg.h"

typedef struct {
        uint64_t max;
        uint32_t count;
        uint32_t *array;
} lichbd_analysis_arg_t;

typedef struct {
        int idx;
        uint64_t clock;
        lichbd_cache_t *cache;
} load_arg_t;

typedef struct {
        struct list_head hook;
        task_t task;
} wait_arg_t;

#define LICHBD_CACHE_SPLIT (1024 * 1024 / 8)
#define LICHBD_CACHE_LOAD_MAX (300 * 8)
#define LICHBD_CACHE_MAX (LICHBD_CACHE_SPLIT / 2)
#define LICHBD_HIT ((LLU)1000 * 1000 * 1000 * 1000)
#define LICHBD_CACHE_CHECK_INTERVAL 20
#define LICHBD_CACHE_LOAD_INTERVAL 5
#define LICHBD_CACHE_RESET (60 * 2)
#define LICHBD_ANALYSIS_ARRAY 100
#define LICHBD_CACHE_LOCK "lichbd_cache.lock"

static char __home__[MAX_PATH_LEN];

inline static uint64_t __lichbd_cache_size2split(uint64_t size)
{
        uint64_t chknum = size / LICHBD_CACHE_SPLIT;
        if (size % LICHBD_CACHE_SPLIT)
                chknum++;

        return chknum;
}

static int __lichbd_cache_lock(const char *_path, int flag)
{
        int ret, fd;
        char path[MAX_PATH_LEN];
        struct stat stbuf, newbuf;

retry:
        snprintf(path, MAX_PATH_LEN, "%s/%s", _path, LICHBD_CACHE_LOCK);
        ret = path_validate(path, YLIB_NOTDIR, YLIB_DIRCREATE);
        if (unlikely(ret)) {
                UNIMPLEMENTED(__DUMP__);
                GOTO(err_ret, ret);
        }

        fd = open(path, O_RDONLY | O_CLOEXEC | O_CREAT, 0644);
        if (fd < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        ret = flock(fd, LOCK_EX | flag);
        if (ret == -1) {
                ret = errno;
                DBUG("lock %s fail, ret (%u) %s\n", path, ret, strerror(ret));
                GOTO(err_fd, ret);
        }

        ret = fstat(fd, &stbuf);
        if (ret < 0) {
                ret = errno;
                GOTO(err_fd, ret);
        }

        ret = stat(path, &newbuf);
        if (ret < 0) {
                ret = errno;
                GOTO(err_fd, ret);
        }

        if (newbuf.st_ino != stbuf.st_ino) {
                flock(fd, LOCK_UN);
                close(fd);
                goto retry;
        }

        return fd;
err_fd:
        close(fd);
err_ret:
        return -ret;
}

static void __lichbd_cache_unlock(const char *_path, int fd)
{
        char path[MAX_PATH_LEN];

        snprintf(path, MAX_PATH_LEN, "%s/%s", _path, LICHBD_CACHE_LOCK);
        unlink(path);

        flock(fd, LOCK_UN);
}

int lichbd_cache_create(lichbd_cache_t **_cache, const char *name, uint64_t size)
{
        int ret, i, fd;
        lichbd_cache_t *cache;
        cache_chunk_t *chunk;
        time_t now;
        char path[MAX_PATH_LEN];

        snprintf(path, MAX_PATH_LEN, "%s/%s", __home__, name);
        fd = __lichbd_cache_lock(path, 0);
        if (fd < 0) {
                ret = -fd;
                GOTO(err_ret, ret);
        }

        ret = ymalloc((void **)&cache, sizeof(*cache));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(cache, 0x0, sizeof(*cache));
        ret = ymalloc((void **)&cache->chunk, sizeof(cache_chunk_t) * __lichbd_cache_size2split(size));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(cache->chunk, 0x0, sizeof(cache_chunk_t) * __lichbd_cache_size2split(size));

        now = gettime();
        cache->count = __lichbd_cache_size2split(size);
        for (i = 0; i < (int)cache->count; i++) {
                chunk = &cache->chunk[i];
                chunk->begin = now;
                chunk->last_update = now;
                chunk->cached = 0;
                chunk->loading = 0;
                chunk->clock0 = 0;
                chunk->clock1 = 0;
                INIT_LIST_HEAD(&chunk->wlist);
        }

        strcpy(cache->name, name);
        *_cache = cache;


        DINFO("lichbd cache %s create\n", name);

        return 0;
err_ret:
        return ret;
}

void lichbd_cache_destroy(lichbd_cache_t *cache)
{
        yfree((void **)&cache->chunk);
        yfree((void **)&cache);
}

static int __lichbd_cleanup_iterator__(const char *parent, const char *key, void *args)
{
        char path[MAX_PATH_LEN];

        (void) args;

        snprintf(path, MAX_PATH_LEN, "%s/%s", parent, key);
        unlink(path);
        rmdir(path);

        return 0;
}

static int __lichbd_cleanup_iterator(const char *parent, const char *key, void *args)
{
        int ret, fd;
        char path[MAX_PATH_LEN];

        (void) args;

        if (memcmp(key, "file", strlen("file"))) {
                DWARN("skip %s\n", key);
                return 0;
        }

        snprintf(path, MAX_PATH_LEN, "%s/%s", parent, key);
        fd = __lichbd_cache_lock(path, LOCK_NB);
        if (fd < 0) {
                ret = -fd;
                if (ret == EBUSY || ret == EWOULDBLOCK || ret == EAGAIN) {
                        DWARN("skip %s\n", key);
                        return 0;
                } else
                        GOTO(err_ret, ret);
        }

        ret = _dir_iterator(path, __lichbd_cleanup_iterator__, NULL);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __lichbd_cache_unlock(path ,fd);

        return 0;
err_lock:
        __lichbd_cache_unlock(path ,fd);
err_ret:
        return ret;
}

static int __lichbd_cache_max_iterator(const char *parent, const char *key, void *args)
{
        int ret;
        uint64_t *max = args, value;
        char path[MAX_PATH_LEN], tmp[MAX_NAME_LEN];
        struct stat stbuf;

        if (strcmp(key, "edge") == 0)
                return 0;

        snprintf(path, MAX_PATH_LEN, "%s/%s", parent, key);
        ret = stat(path, &stbuf);
        if (gettime() - stbuf.st_mtime > LICHBD_CACHE_CHECK_INTERVAL * 2) {
                DWARN("%s timeout\n", path);
                unlink(path);
                return 0;
        }

        if (strcmp(&key[strlen(key) - strlen("max")], "max")) {
                return 0;
        }

        ret = _get_value(path, tmp, MAX_BUF_LEN);
        if (ret < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        value = atoi(tmp);

        *max = *max > value ? *max : value;

        unlink(path);

        return 0;
err_ret:
        return ret;
}

static int __lichbd_cache_iterator(const char *parent, const char *key, void *_arg)
{
        int ret, fd;
        uint64_t value, offset, idx;
        char path[MAX_PATH_LEN];
        lichbd_analysis_arg_t *arg = _arg;

        DBUG("check %s\n", key);

        if (strcmp(&key[strlen(key) - strlen("data")], "data")) {
                DWARN("skip %s\n", key);
                return 0;
        }

        snprintf(path, MAX_PATH_LEN, "%s/%s", parent, key);
        fd = open(path, O_RDONLY | O_CLOEXEC);
        if (fd < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }


        offset = 0;
        while (1) {
                ret = pread(fd, &value, sizeof(value), offset);
                if (ret < 0) {
                        ret = errno;
                        GOTO(err_fd, ret);
                }

                offset += ret;
                DBUG("read %s fd %u offset %llu retval %u\n", key, fd, (LLU)offset, ret);

                if (ret == 0) {
                        break;
                }

                if (value > arg->max) {
                        DWARN("value %llu skipped\n", (LLU)value);
                        continue;
                }

                idx = (value * LICHBD_ANALYSIS_ARRAY) / arg->max;
                DBUG("value %llu idx %llu\n", (LLU)value, (LLU)idx);
                YASSERT(idx <= LICHBD_ANALYSIS_ARRAY);
                if (idx == LICHBD_ANALYSIS_ARRAY)
                        idx--;

                arg->array[idx] ++;
        }

        close(fd);
        unlink(path);

        return 0;
err_fd:
        close(fd);
err_ret:
        return ret;
}

static int __lichbd_cache_get_edge(lichbd_analysis_arg_t *arg)
{
        int ret, i;
        struct statfs buf;
        uint64_t df, chunks, count, edge;
        char path[MAX_PATH_LEN], tmp[MAX_BUF_LEN];

        ret = statfs(__home__, &buf);
        if (ret < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        df = (LLU)buf.f_bsize * buf.f_bfree;
        chunks = (df - (LLU)1024 * 1024 * 1024 * 10) / LICHBD_CACHE_SPLIT;

#if 0
        for (i = LICHBD_ANALYSIS_ARRAY - 1; i >= 0; i--) {
                DWARN("array[%u] count %llu\n", i, (LLU)arg->array[i]);
        }
#endif

        count = 0;
        for (i = LICHBD_ANALYSIS_ARRAY - 1; i >= 0; i--) {
                if (count + arg->array[i] > chunks) {
                        break;
                }

                count +=  arg->array[i];
        }

        edge = arg->max * (i + 1) / LICHBD_ANALYSIS_ARRAY;
        DWARN("free %llu max chunk %llu count %llu edge %llu\n", (LLU)df,
              (LLU)chunks, (LLU)count, (LLU)edge);

        snprintf(tmp, MAX_BUF_LEN, "%llu", (LLU)edge);
        snprintf(path, MAX_PATH_LEN, "%s/analysis/edge", __home__);
        ret = _set_text(path, tmp, strlen(tmp) + 1, O_CREAT | O_TRUNC);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}


static int __lichbd_cache_bh()
{
        int ret;
        char path[MAX_PATH_LEN];
        uint64_t max = 0;
        lichbd_analysis_arg_t arg;

        _dir_iterator(__home__, __lichbd_cleanup_iterator, NULL);

        snprintf(path, MAX_PATH_LEN, "%s/analysis", __home__);

        ret = _dir_iterator(path, __lichbd_cache_max_iterator, &max);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DBUG("max %llu\n", (LLU)max);

        arg.max = max;
        arg.count = LICHBD_ANALYSIS_ARRAY;

        ret = ymalloc((void **)&arg.array, sizeof(uint32_t) * LICHBD_ANALYSIS_ARRAY);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = _dir_iterator(path, __lichbd_cache_iterator, &arg);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ret = __lichbd_cache_get_edge(&arg);
        if (unlikely(ret))
                GOTO(err_free, ret);

        yfree((void **)&arg.array);


        return 0;
err_free:
        yfree((void **)&arg.array);
err_ret:
        return ret;
}

static void *__lichbd_cache_worker(void *arg)
{
        int ret, fd;
        const char *path = arg;

        DINFO("start...\n");

        while (1) {
                fd = __lichbd_cache_lock(path,  LOCK_NB);
                if (fd < 0) {
                        ret = -fd;
                        if (ret == EBUSY || ret == EWOULDBLOCK) {
                                sleep(10);
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }

                break;
        }

        while (1) {
                sleep(LICHBD_CACHE_CHECK_INTERVAL * 2);

                ret = __lichbd_cache_bh();
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return NULL;
err_ret:
        UNIMPLEMENTED(__DUMP__);
        return NULL;
}

int lichbd_cache_init(const char *path)
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;
        char _path[MAX_PATH_LEN];

        strcpy(__home__, path);

        snprintf(_path, MAX_PATH_LEN, "%s/analysis", path);

        ret = path_validate(_path, YLIB_ISDIR, YLIB_DIRCREATE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        ret = pthread_create(&th, &ta, __lichbd_cache_worker, (void *)path);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DWARN("lichbd cache init\n");

        return 0;
err_ret:
        return ret;
}

static void __lichbd_cache_update__(cache_chunk_t *chunk, time_t now, uint64_t *_iops)
{
        uint64_t used, iops;

        if (chunk->hit == 0) {
                *_iops = 0;
                return;
        }

        used = gettime() - chunk->begin;
        if (used < 1) {
                used = 1;
        }

        iops = chunk->hit / used;

        if (now - chunk->begin > LICHBD_CACHE_RESET && chunk->hit) {
                DINFO("reset, iops %llu, hit %llu\n", (LLU)iops, (LLU)chunk->hit);
                chunk->hit = iops;
                chunk->begin = now;
        }

        *_iops = iops;
}

static void __lichbd_cache_resume(cache_chunk_t *chunk)
{
        struct list_head *pos, *n;
        wait_arg_t *wait_arg = NULL;

        list_for_each_safe(pos, n, &chunk->wlist) {
                wait_arg = (void *)wait_arg;
                list_del(&wait_arg->hook);

                schedule_resume(&wait_arg->task, 0, NULL);
        }
}

static void __lichbd_cache_drop(const char *name, int idx)
{
        int hash;
        char path[MAX_PATH_LEN];

        hash = idx % 1024;
        sprintf(path, "%s/%s/%u/%u", __home__, name, hash, idx);
        unlink(path);

        DWARN("drop %s\n", path);
}

static int __lichbd_cache_open(const char *name, int idx, int flag)
{ 
        int ret, fd, hash;
        char path[MAX_PATH_LEN];

        hash = idx % 1024;
        sprintf(path, "%s/%s/%u/%u", __home__, name, hash, idx);

retry:
        fd = open(path, flag, 0644);
        if (fd < 0) {
                ret = errno;
                if (ret == ENOENT && flag & O_CREAT) {
                        ret = path_validate(path, YLIB_NOTDIR, YLIB_DIRCREATE);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        goto retry;
                }

                GOTO(err_ret, ret);
        }

        return fd;
err_ret:
        return -ret;
}

static void __lichbd_cache_load__(void *_arg)
{
        int ret, fd, idx, iov_count;
        buffer_t buf;
        cache_chunk_t *chunk;
        lichbd_cache_t *cache;
        load_arg_t *arg = _arg;
        task_t task;
        struct iocb iocb;
        struct iovec iov[LICHBD_CACHE_SPLIT * 5 / PAGE_SIZE];
        int localize = 0;

        cache = arg->cache;
        idx = arg->idx;
        chunk = &cache->chunk[idx];

        DBUG("load idx %u\n", idx);

        if (chunk->clock0 != arg->clock || chunk->clock0 != chunk->clock1) {
                DWARN("chunk %s[%u] changed\n", cache->name, arg->idx);
                ret = ESTALE;
                GOTO(err_ret, ret);
        }

        mbuffer_init(&buf, 0);
        ret = lichbd_rpc_read(&buf, LICHBD_CACHE_SPLIT, (LLU)idx * LICHBD_CACHE_SPLIT, localize);
        if (unlikely(ret))
                GOTO(err_free, ret);

        if (chunk->clock0 != arg->clock || chunk->clock0 != chunk->clock1) {
                DWARN("chunk %s[%u] changed\n", cache->name, arg->idx);
                ret = ESTALE;
                GOTO(err_free, ret);
        }

        fd = __lichbd_cache_open(cache->name, idx, O_RDWR | O_CREAT | O_SYNC);
        if (fd < 0) {
                ret = errno;
                GOTO(err_free, ret);
        }

#if 0
        ret = mbuffer_writefile(&buf, fd, 0);
        if (unlikely(ret)) {
                GOTO(err_fd, ret);
        }

#else

        task = schedule_task_get();
        iov_count = LICHBD_CACHE_SPLIT * 5 / PAGE_SIZE;
        ret = mbuffer_trans(iov, &iov_count,  &buf);
        YASSERT(ret == (int)buf.len);
        io_prep_pwritev(&iocb, fd, iov, iov_count, 0);
        iocb.aio_reqprio = 0;
        iocb.data = &task;

        ret = aio_commit(&iocb);
        if (unlikely(ret))
                GOTO(err_fd, ret);

#endif

        close(fd);

        chunk->cached = 1;

        mbuffer_free(&buf);
        __lichbd_cache_resume(chunk);
        chunk->loading = 0;
        yfree((void **)&arg);

        DINFO("load %s[%u] success\n", cache->name, idx);
        cache->loading--;

        return;
err_fd:
        close(fd);
err_free:
        mbuffer_free(&buf);
err_ret:
        DINFO("load %s[%u] fail\n", cache->name, idx);
        __lichbd_cache_resume(chunk);
        chunk->loading = 0;
        cache->loading--;
        yfree((void **)&arg);
        return;
}

static void __lichbd_cache_load(lichbd_cache_t *cache)
{
        int ret, i, count;
        uint64_t iops;
        cache_chunk_t *chunk;
        time_t now;
        load_arg_t *arg;

        if (cache->loading >= LICHBD_CACHE_LOAD_MAX) {
                DWARN("loading %u, skip\n", cache->loading);
        }

        DWARN("loading...\n");

        now = gettime();
        count = 0;
        for (i = 0; i < (int)cache->count; i++) {
                chunk = &cache->chunk[i];

                __lichbd_cache_update__(chunk, now, &iops);

#if 1
                if (now - chunk->last_update < 1) {
                        continue;
                }
#endif

                if (!chunk->cached && iops > cache->edge) {
                        ret = ymalloc((void **)&arg, sizeof(arg));
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);

                        arg->cache = cache;
                        arg->idx = i;
                        arg->clock = chunk->clock0;
                        chunk->loading = 1;
                        cache->loading++;

                        schedule_task_new("lichbd_cache_load", __lichbd_cache_load__, arg, -1);

                        DWARN("load %s[%u]\n", cache->name, i);

                        count++;
                        if (count >= LICHBD_CACHE_LOAD_MAX)
                                break;

                        if (cache->loading >= LICHBD_CACHE_LOAD_MAX) {
                                DWARN("loading %u, skip\n", cache->loading);
                                break;
                        }
                }
        }
}

inline static void __lichbd_cache_wait(cache_chunk_t *chunk, const char *name)
{
        wait_arg_t wait_arg;

        if (!chunk->loading)
                return;

        wait_arg.task = schedule_task_get();
        list_add_tail(&wait_arg.hook, &chunk->wlist);

        DWARN("cache wait %s\n", name);
        schedule_yield(name, chunk, -1, NULL);
        DWARN("cache resume %s\n", name);
}

static int __lichbd_cache_read(const char *name, int idx, buffer_t *buf, size_t size, off_t offset)
{
        int ret, iov_count, fd;
        struct iocb iocb;
        struct iovec iov[LICHBD_CACHE_SPLIT / PAGE_SIZE];
        task_t task = schedule_task_get();

        DBUG("lichbd cache read\n");

        mbuffer_init(buf, size);
        iov_count = LICHBD_CACHE_SPLIT / PAGE_SIZE;
        ret = mbuffer_trans(iov, &iov_count,  buf);
        YASSERT(ret == (int)buf->len);

        fd = __lichbd_cache_open(name, idx, O_RDWR | O_DIRECT);
        if (fd < 0) {
                ret = -fd;
                GOTO(err_ret, ret);
        }

        io_prep_preadv(&iocb, fd, iov, iov_count, offset % LICHBD_CACHE_SPLIT);
        iocb.aio_reqprio = 0;
        iocb.data = &task;

        DBUG("aio yield, task %u\n", task.taskid);
        ret = aio_commit(&iocb);
        if (unlikely(ret)) {
                GOTO(err_fd, ret);
        }

        DBUG("aio resume, task %u\n", task.taskid);

        close(fd);

        return 0;
err_fd:
        close(fd);
err_ret:
        return ret;
}

#if 1
static int __lichbd_cache_write(const char *name, int idx, const buffer_t *buf, size_t size, off_t offset)
{
        int ret, iov_count, fd;
        struct iocb iocb;
        struct iovec iov[LICH_IOV_MAX + 1];
        task_t task = schedule_task_get();

        (void) size;

        DBUG("lichbd cache write\n");

        iov_count = LICH_IOV_MAX + 1;
        ret = mbuffer_trans(iov, &iov_count,  buf);
        YASSERT(ret == (int)buf->len);

        fd = __lichbd_cache_open(name, idx, O_RDWR | O_DIRECT);
        if (fd < 0) {
                ret = -fd;
                GOTO(err_ret, ret);
        }

        io_prep_pwritev(&iocb, fd, iov, iov_count, offset % LICHBD_CACHE_SPLIT);
        iocb.aio_reqprio = 0;
        iocb.data = &task;

        ret = aio_commit(&iocb);
        if (unlikely(ret)) {
                GOTO(err_fd, ret);
        }

        DBUG("aio resume, task %u\n", task.taskid);
        close(fd);

        return 0;
err_fd:
        close(fd);
err_ret:
        return ret;
}

#else

static int __lichbd_cache_write(cache_chunk_t *chunk, const buffer_t *buf, size_t size, off_t offset)
{
        int ret;

        (void) size;

        ret = mbuffer_writefile(buf, chunk->fd, offset % LICHBD_CACHE_SPLIT);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

#endif

int lichbd_cache_read(lichbd_cache_t *cache, buffer_t *buf, size_t size, off_t offset)
{
        int ret;
        cache_chunk_t *chunk;
        uint32_t idx, max, min;
        uint64_t clock;
        int localize = 0;

        idx = offset / LICHBD_CACHE_SPLIT;

        DBUG("cache %p size %llu idx %llu %llu\n", cache, (LLU)size,
              (LLU)idx,  (LLU)__lichbd_cache_size2split(offset + size));

        min = offset / LICHBD_CACHE_SPLIT;
        max = __lichbd_cache_size2split(offset + size);
        if (cache && max - min == 1) {
                idx = min;
                chunk = &cache->chunk[idx];

                if (size <= LICHBD_CACHE_MAX)
                        chunk->hit += (LICHBD_CACHE_LOAD_INTERVAL * 2);

                if (chunk->cached && chunk->clock0 == chunk->clock1) {
                        YASSERT(!chunk->loading);

                        clock = chunk->clock0;
                        ret = __lichbd_cache_read(cache->name, idx, buf, size, offset);
                        if (unlikely(ret)) {
                                GOTO(err_ret, ret);
                        }

                        if (!chunk->cached || chunk->clock0 != clock) {
                                mbuffer_free(buf);
                                mbuffer_init(buf, 0);

                                ret = lichbd_rpc_read(buf, size, offset, localize);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);
                        }

                        DBUG("read form cache %u %s.%d\n", (int)size, cache->name, idx);
                } else {
                        ret = lichbd_rpc_read(buf, size, offset, localize);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }
        } else {
                ret = lichbd_rpc_read(buf, size, offset, localize);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        DBUG("read %u\n", buf->len);

        return 0;
err_ret:
        return ret;
}

static void __lichbd_cache_clock0(lichbd_cache_t *cache, size_t size, off_t offset)
{
        cache_chunk_t *chunk;
        uint32_t min, max, idx;

        if (!cache)
                return;

        min = offset / LICHBD_CACHE_SPLIT;
        max = __lichbd_cache_size2split(offset + size);
        YASSERT(min != max);
        for (idx = min; idx < max; idx++) {
                YASSERT(idx < cache->count);
                chunk = &cache->chunk[idx];
                chunk->clock0++;
                chunk->last_update = gettime();
        }
}

static void __lichbd_cache_clock1(lichbd_cache_t *cache, size_t size, off_t offset)
{
        cache_chunk_t *chunk;
        uint32_t min, max, idx;

        if (!cache)
                return;

        min = offset / LICHBD_CACHE_SPLIT;
        max = __lichbd_cache_size2split(offset + size);
        YASSERT(min != max);
        for (idx = min; idx < max; idx++) {
                YASSERT(idx < cache->count);
                chunk = &cache->chunk[idx];
                chunk->clock1++;
                chunk->last_update = gettime();
        }
}

int lichbd_cache_write(lichbd_cache_t *cache, const buffer_t *buf, size_t size, off_t offset)
{
        int ret;
        cache_chunk_t *chunk;
        uint32_t min, max, idx;

        __lichbd_cache_clock0(cache, size, offset);

        ret = lichbd_rpc_write(buf, size, offset);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (!cache) {
                goto out;
        }

        min = offset / LICHBD_CACHE_SPLIT;
        max = __lichbd_cache_size2split(offset + size);
        YASSERT(min != max);
        if (max - min == 1) {
                idx = min;
                chunk = &cache->chunk[idx];
                if (chunk->cached) {
                        DBUG("update cache cache %u\n", (int)size);

                        ret = __lichbd_cache_write(cache->name, idx, buf, size, offset);
                        if (unlikely(ret)) {
                                DWARN("update cache[%u] fail\n", (int)idx);
                        }
                }
        } else {
                for (idx = min; idx < max; idx++) {
                        YASSERT(idx < cache->count);
                        chunk = &cache->chunk[idx];
                        if (chunk->cached) {
                                chunk->cached = 0;
                                __lichbd_cache_drop(cache->name, idx);
                        }
                }
        }

out:
        __lichbd_cache_clock1(cache, size, offset);

        return 0;
err_ret:
        return ret;
}

static int __lichbd_cache_dump(lichbd_cache_t *cache)
{
        int ret, fd, i;
        uint64_t iops, max;
        char tmp[MAX_PATH_LEN], path[MAX_PATH_LEN];
        cache_chunk_t *chunk;
        time_t now;

        ANALYSIS_BEGIN(0);

        snprintf(tmp, MAX_PATH_LEN, "%s/analysis/%s.tmp", __home__, cache->name);
        fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 0640);
        if (fd < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        now = gettime();
        max = 0;
        for (i = 0; i < (int)cache->count; i++) {
                chunk = &cache->chunk[i];

                __lichbd_cache_update__(chunk, now, &iops);

                if (iops > 0) {
                        ret = write(fd, &iops, sizeof(iops));
                        if (ret < 0) {
                                ret = errno;
                                GOTO(err_fd, ret);
                        }

                        max = max > iops ? max : iops;
                }

                if (iops < cache->edge && chunk->cached) {
                        chunk->cached = 0;
                        __lichbd_cache_drop(cache->name, i);
                }
        }

        snprintf(path, MAX_PATH_LEN, "%s/analysis/%s.data", __home__, cache->name);
        rename(tmp, path);
        close(fd);

        snprintf(path, MAX_PATH_LEN, "%s/analysis/%s.max", __home__, cache->name);
        snprintf(tmp, MAX_PATH_LEN, "%llu", (LLU)max);
        ret = _set_text(path, tmp, strlen(tmp) + 1, O_CREAT | O_TRUNC);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_END(0, 1000 * 1, NULL);

        return 0;
err_fd:
        close(fd);
err_ret:
        return ret;
}

void lichbd_cache_check(lichbd_cache_t *cache)
{
        int ret;
        time_t now;
        char path[MAX_PATH_LEN], tmp[MAX_NAME_LEN];

        if (cache == NULL) {
                return;
        }

        now = gettime();

        if (now - cache->last_check > LICHBD_CACHE_CHECK_INTERVAL) {
                snprintf(path, MAX_PATH_LEN, "%s/analysis/edge", __home__);
                ret = _get_text(path, tmp, MAX_NAME_LEN);
                if (ret < 0) {
                        ret = errno;
                        if (ret == ENOENT) {
                                strcpy(tmp, "0");
                        } else
                                UNIMPLEMENTED(__DUMP__);
                }

                cache->last_check = now;
                cache->edge = atoi(tmp);

                DWARN("set edge %llu\n", (LLU)cache->edge);

                ret = __lichbd_cache_dump(cache);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);
        }

        if (now - cache->last_load > LICHBD_CACHE_LOAD_INTERVAL) {
                cache->last_load = now;
                __lichbd_cache_load(cache);
        }
}
